package org.alive.learn.nio.reactor;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.ReentrantLock;


/**
 * Nio 线程，专门负责nio read,write 本类是实例行代码，不会对nio,断线重连，写半包等场景进行处理,旨在理解
 * Reactor模型（多线程版本）
 * 
 * @author dingwei2
 * 
 */
public class SubReactorThread extends Thread {

	private Selector selector;
	private ExecutorService businessExecutorPool;
	private List<NioTask> taskList = new ArrayList<NioTask>(512);
	private ReentrantLock taskMainLock = new ReentrantLock();

	/**
	 * 业务线程池
	 * 
	 * @param businessExecutorPool
	 */
	public SubReactorThread(ExecutorService businessExecutorPool) {
		try {
			this.businessExecutorPool = businessExecutorPool;
			this.selector = Selector.open();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	/**
	 * socket channel
	 * 
	 * @param socketChannel
	 */
	public void register(NioTask task) {
		if (task != null) {
			try {
				taskMainLock.lock();
				taskList.add(task);
			} finally {
				taskMainLock.unlock();
			}
		}
	}

	// private

	public void run() {
		while (!Thread.interrupted()) {
			Set<SelectionKey> ops = null;
			try {
				selector.select(1000);
				ops = selector.selectedKeys();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
				continue;
			}

			// 处理相关事件
			for (Iterator<SelectionKey> it = ops.iterator(); it.hasNext();) {
				SelectionKey key = it.next();
				SocketChannel clientChannel = (SocketChannel) key.channel();
				it.remove();

				try {
					if (key.isWritable()) { // 向客户端发送请求
						
						ByteBuffer buf = (ByteBuffer) key.attachment();
						buf.flip();
						clientChannel.write(buf);
						System.out.println("服务端向客户端发送数据。。。");
						// 重新注册读事件
						clientChannel.register(selector, SelectionKey.OP_READ);
					} else if (key.isReadable()) { // 接受客户端请求
						System.out.println("服务端接收客户端连接请求。。。");
						ByteBuffer buf = ByteBuffer.allocate(1024);
						System.out.println(buf.capacity());
						clientChannel.read(buf);// 解析请求完毕

						// 转发请求到具体的业务线程；当然，这里其实可以向dubbo那样，支持转发策略，如果执行时间短，
						// ，比如没有数据库操作等，可以在io线程中执行。本实例，转发到业务线程池
						dispatch(clientChannel, buf);

					}
				} catch (Throwable e) {
					// e.printStackTrace();
					System.out.println("客户端主动断开连接。。。。。。。");
					try {
						clientChannel.close();
					} catch (IOException e1) {
						// TODO Auto-generated catch block
						e1.printStackTrace();
					}
				}

			}

			// 注册事件
			if (!taskList.isEmpty()) {
				try {
					taskMainLock.lock();
					for (Iterator<NioTask> it = taskList.iterator(); it.hasNext();) {
						NioTask task = it.next();
						try {
							SocketChannel sc = task.getSc();
							if (task.getData() != null) {
								sc.register(selector, task.getOp(), task.getData());
							} else {
								sc.register(selector, task.getOp());
							}

						} catch (Throwable e) {
							e.printStackTrace();// ignore
						}
						it.remove();
					}

				} finally {
					taskMainLock.unlock();
				}
			}

		}
	}

	/**
	 * 此处的reqBuffer处于可写状态
	 * 
	 * @param sc
	 * @param reqBuffer
	 */
	private void dispatch(SocketChannel sc, ByteBuffer reqBuffer) {
		businessExecutorPool.submit(new Handler(sc, reqBuffer, this));
	}
}
